Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 32 additions & 11 deletions packages/agents-hosting-storage-blob/src/blobsStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import StreamConsumers from 'stream/consumers'
import { isTokenCredential, TokenCredential } from '@azure/core-auth'
import {
AnonymousCredential,
BlobRequestConditions,
ContainerClient,
StoragePipelineOptions,
StorageSharedKeyCredential,
} from '@azure/storage-blob'
import { Storage, StoreItems } from '@microsoft/agents-hosting'
import { Storage, StorageWriteOptions, StoreItems } from '@microsoft/agents-hosting'
import { ExceptionHelper } from '@microsoft/agents-activity'
import { Errors } from './errorHelper'
import { sanitizeBlobKey } from './blobsTranscriptStore'
Expand Down Expand Up @@ -136,30 +137,50 @@ export class BlobsStorage implements Storage {
* @returns A promise that resolves when the write operation is complete
* @throws Will throw if there's a validation error, eTag conflict, or other storage error
*/
async write (changes: StoreItems): Promise<void> {
async write (changes: StoreItems, options?: StorageWriteOptions): Promise<StoreItems> {
z.record(z.unknown()).parse(changes)

await this._initialize()

await Promise.all(
Object.entries(changes).map(async ([key, { eTag = '', ...change }]) => {
const entries = Object.entries(changes)
const results = await Promise.all(
entries.map(async ([key, value]) => {
const { eTag = '', ...change } = value
const conditions: BlobRequestConditions = {}

if (options?.ifNotExists) {
logger.debug(`ifNoneMatch=* condition applied for key: ${key}`)
conditions.ifNoneMatch = '*'
} else if (typeof eTag === 'string' && eTag !== '*') {
logger.debug(`ifMatch=${eTag} condition applied for key: ${key}`)
conditions.ifMatch = eTag
}

try {
const blob = this._containerClient.getBlockBlobClient(sanitizeBlobKey(key))
const serialized = JSON.stringify(change)
logger.debug(`Writing blob: ${key}, eTag: ${eTag}, size: ${serialized.length}`)
return await blob.upload(serialized, serialized.length, {
conditions: typeof eTag === 'string' && eTag !== '*' ? { ifMatch: eTag } : {},
const item = await blob.upload(serialized, serialized.length, {
conditions,
blobHTTPHeaders: { blobContentType: 'application/json' },
})
} catch (err: any) {
if (err.statusCode === 412) {
throw ExceptionHelper.generateException(Error, Errors.ETagConflict)
} else {
throw err

return { key, eTag: item.etag }
} catch (cause: any) {
if (cause.statusCode === 409) {
throw ExceptionHelper.generateException(Error, Errors.ItemAlreadyExists, cause, { key })
}

if (cause.statusCode === 412) {
throw ExceptionHelper.generateException(Error, Errors.ETagConflict, cause, { key })
}

throw cause
}
})
)

return results.reduce((acc, { key, eTag }) => ({ ...acc, [key]: { eTag } }), {})
}

/**
Expand Down
12 changes: 10 additions & 2 deletions packages/agents-hosting-storage-blob/src/errorHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ export const Errors: { [key: string]: AgentErrorDefinition } = {
},

/**
* Error thrown when there is an eTag conflict during storage write.
* Error thrown when there is an eTag conflict during a storage operation.
*/
ETagConflict: {
code: -160002,
description: 'Storage: error writing "{key}" due to eTag conflict.'
description: 'Unable to write \'{key}\' due to eTag conflict.'
},

/**
* Error thrown when attempting to create an item that already exists in storage.
*/
ItemAlreadyExists: {
code: -160003,
description: 'Unable to write \'{key}\' because it already exists.'
}
}
54 changes: 53 additions & 1 deletion packages/agents-hosting-storage-cosmos/CosmosDBErrorCodes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

This document provides detailed information about error codes in the Microsoft 365 Agents SDK for JavaScript CosmosDB storage package. Each error includes a description, context, and likely fixes.

## Storage - CosmosDB Errors (-100000 to -100019)
## Storage - CosmosDB Errors (-100000 to -100021)

### -100000
#### Missing CosmosDB Storage Options
Expand Down Expand Up @@ -472,3 +472,55 @@ const state = {
```

4. **Check for accidental object references** that create circular dependencies in your state data.

### -100020
#### ETag Conflict

Unable to write '{key}' due to eTag conflict.

**Description & Context:** This error occurs during write operations when an eTag conflict is detected. An eTag (entity tag) is a version identifier used for optimistic concurrency control in Cosmos DB. When you read a document, it includes an eTag value. If you then attempt to write that document back with a specific eTag (rather than using the wildcard '*' for last-write-wins), Cosmos DB checks if the eTag still matches the current version of the document. If another process has modified the document since you read it, the eTag will no longer match, and the write operation fails with a conflict. This error indicates that the state document was modified by another concurrent operation between the time you read it and when you attempted to write your changes.

**Likely Fix:** Choose one of two approaches:

1. **Use last-write-wins** (simplest): Set eTag to `'*'` to overwrite regardless of current version:
```typescript
const changes: StoreItems = {
'conversation-id': {
userState: { /* your state */ },
eTag: '*'
}
};
await storage.write(changes);
```

2. **Retry with re-read**: Read the latest version and retry with its current eTag:
```typescript
const items = await storage.read(['conversation-id']);
const item = items['conversation-id'];
const changes: StoreItems = {
'conversation-id': {
userState: { /* your state */ },
eTag: item?.eTag || '*'
}
};
await storage.write(changes);
```

### -100021
#### Item Already Exists

Unable to write '{key}' because it already exists.

**Description & Context:** This error occurs during write operations when attempting to create a new item in storage using a key that already exists in Cosmos DB. This error is typically thrown when using conditional write operations that enforce a "create-only" semantic (only allowing new documents to be created, not updates to existing ones). This can happen when the storage implementation has specific logic to prevent accidental overwrites of existing state, or when using Cosmos DB's conditional request logic with preconditions. The error indicates that the document you're trying to create with the given key is already present in the container.

**Likely Fix:** This error is expected when using `StorageWriteOptions.ifNotExists`. To avoid it, don't use the `ifNotExists` parameter:

```typescript
// Instead of:
// await storage.write(changes, { ifNotExists: true });

// Use:
await storage.write(changes);
```

This performs an upsert operation that handles both create and update scenarios.
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

import { Container, CosmosClient } from '@azure/cosmos'
import { Container, CosmosClient, ItemDefinition, ItemResponse, RequestOptions } from '@azure/cosmos'
import { CosmosDbKeyEscape } from './cosmosDbKeyEscape'
import { DocumentStoreItem } from './documentStoreItem'
import { CosmosDbPartitionedStorageOptions } from './cosmosDbPartitionedStorageOptions'
import { Storage, StoreItems } from '@microsoft/agents-hosting'
import { Storage, StorageWriteOptions, StoreItems } from '@microsoft/agents-hosting'
import { ExceptionHelper } from '@microsoft/agents-activity'
import { Errors } from './errorHelper'

Expand Down Expand Up @@ -170,47 +170,61 @@ export class CosmosDbPartitionedStorage implements Storage {
* Writes items to storage.
* @param changes The items to write.
*/
async write (changes: StoreItems): Promise<void> {
async write (changes: StoreItems, options?: StorageWriteOptions): Promise<StoreItems> {
if (!changes) {
throw ExceptionHelper.generateException(
ReferenceError,
Errors.MissingWriteChanges
)
} else if (changes.length === 0) {
return
} else if (Object.keys(changes).length === 0) {
return {}
}

await this.initialize()

await Promise.all(
Object.entries(changes).map(async ([key, { eTag, ...change }]): Promise<void> => {
const document = new DocumentStoreItem({
id: CosmosDbKeyEscape.escapeKey(
key,
this.cosmosDbStorageOptions.keySuffix,
this.cosmosDbStorageOptions.compatibilityMode
),
realId: key,
document: change,
})
const writePromises = Object.entries(changes).map(async ([key, value]) => {
const { eTag, ...change } = value
const requestOptions: RequestOptions = {}

const accessCondition =
eTag !== '*' && eTag != null && eTag.length > 0
? { accessCondition: { type: 'IfMatch', condition: eTag } }
: undefined
if (eTag !== '*' && eTag != null && eTag.length > 0) {
requestOptions.accessCondition = { type: 'IfMatch', condition: eTag }
}

try {
await this.container.items.upsert(document, accessCondition)
} catch (err: any) {
this.checkForNestingError(change, err)
throw ExceptionHelper.generateException(
Error,
Errors.DocumentUpsertError,
err
)
}
const document = new DocumentStoreItem({
id: CosmosDbKeyEscape.escapeKey(
key,
this.cosmosDbStorageOptions.keySuffix,
this.cosmosDbStorageOptions.compatibilityMode
),
realId: key,
document: change,
})
)

try {
let item: ItemResponse<ItemDefinition>
if (options?.ifNotExists) {
requestOptions.accessCondition = { type: 'IfNoneMatch', condition: '*' }
item = await this.container.items.create(document, requestOptions)
} else {
item = await this.container.items.upsert(document, requestOptions)
}
return { key, eTag: item.etag }
} catch (cause: any) {
if (cause.code === 409) {
throw ExceptionHelper.generateException(Error, Errors.ItemAlreadyExists, cause, { key })
}

if (cause.code === 412) {
throw ExceptionHelper.generateException(Error, Errors.ETagConflict, cause, { key })
}

this.checkForNestingError(change, cause)
throw ExceptionHelper.generateException(Error, Errors.DocumentUpsertError, cause)
}
})

const results = await Promise.all(writePromises)
return results.reduce((acc, { key, eTag }) => ({ ...acc, [key]: { eTag } }), {})
}

/**
Expand Down
16 changes: 16 additions & 0 deletions packages/agents-hosting-storage-cosmos/src/errorHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,5 +177,21 @@ export const Errors: { [key: string]: AgentErrorDefinition } = {
MaxNestingDepthExceeded: {
code: -100019,
description: 'Maximum nesting depth of {maxDepth} exceeded. {additionalMessage}'
},

/**
* Error thrown when there is an eTag conflict during a storage operation.
*/
ETagConflict: {
code: -100020,
description: 'Unable to write \'{key}\' due to eTag conflict.'
},

/**
* Error thrown when attempting to create an item that already exists in storage.
*/
ItemAlreadyExists: {
code: -100021,
description: 'Unable to write \'{key}\' because it already exists.'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ describe('Errors tests', () => {
val => val && typeof val === 'object' && 'code' in val && 'description' in val
) as AgentErrorDefinition[]

// All error codes should be negative and in the range -100000 to -100019
// All error codes should be negative and in the range -100000 to -100021
errorDefinitions.forEach(errorDef => {
assert.ok(errorDef.code < 0, `Error code ${errorDef.code} should be negative`)
assert.ok(errorDef.code >= -100019, `Error code ${errorDef.code} should be >= -100019`)
assert.ok(errorDef.code >= -100021, `Error code ${errorDef.code} should be >= -100021`)
assert.ok(errorDef.code <= -100000, `Error code ${errorDef.code} should be <= -100000`)
})
})
Expand Down
4 changes: 2 additions & 2 deletions packages/agents-hosting/src/app/auth/handlerStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@ export class HandlerStorage<TActiveHandler extends ActiveAuthorizationHandler =
/**
* Writes handler state to storage.
*/
public write (data: TActiveHandler) : Promise<void> {
public write (data: TActiveHandler) {
return this.storage.write({ [this.key]: data })
}

/**
* Deletes handler state from storage.
*/
public async delete (): Promise<void> {
public async delete () {
try {
await this.storage.delete([this.key])
} catch (error) {
Expand Down
Loading